Spaces:
Running
Running
| """ | |
| Audio utilities for processing and file I/O | |
| Handles loading, saving, and processing audio files | |
| """ | |
| import logging | |
| import numpy as np | |
| import soundfile as sf | |
| import librosa | |
| from typing import Tuple, Optional | |
| import tempfile | |
| import os | |
| logger = logging.getLogger(__name__) | |
| class AudioProcessor: | |
| """Handles audio file operations and processing""" | |
| SUPPORTED_FORMATS = ['wav', 'mp3', 'm4a', 'flac', 'ogg'] | |
| DEFAULT_SAMPLE_RATE = 24000 # For XTTS-v2 | |
| def load_audio( | |
| file_path: str, | |
| sr: Optional[int] = None, | |
| mono: bool = True | |
| ) -> Tuple[np.ndarray, int]: | |
| """ | |
| Load audio file | |
| Args: | |
| file_path: Path to audio file | |
| sr: Target sample rate (None = original) | |
| mono: Convert to mono if True | |
| Returns: | |
| Tuple of (audio_waveform, sample_rate) | |
| """ | |
| logger.info(f"Loading audio from: {file_path}") | |
| try: | |
| # Load with librosa for flexibility | |
| audio, sample_rate = librosa.load( | |
| file_path, | |
| sr=sr, | |
| mono=mono | |
| ) | |
| logger.info(f"Audio loaded. Shape: {audio.shape}, SR: {sample_rate}") | |
| return audio, sample_rate | |
| except Exception as e: | |
| logger.error(f"Error loading audio: {str(e)}") | |
| raise | |
| def save_audio( | |
| audio_waveform: np.ndarray, | |
| sample_rate: int, | |
| output_path: str, | |
| subtype: str = 'PCM_16' | |
| ) -> str: | |
| """ | |
| Save audio to WAV file | |
| Args: | |
| audio_waveform: Audio waveform array | |
| sample_rate: Sample rate | |
| output_path: Output file path | |
| subtype: Audio subtype (PCM_16, PCM_24, PCM_32, FLOAT) | |
| Returns: | |
| Path to saved file | |
| """ | |
| logger.info(f"Saving audio to: {output_path}") | |
| try: | |
| # Ensure output directory exists | |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
| # Save audio | |
| sf.write(output_path, audio_waveform, sample_rate, subtype=subtype) | |
| logger.info(f"Audio saved successfully. Size: {os.path.getsize(output_path)} bytes") | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error saving audio: {str(e)}") | |
| raise | |
| def resample_audio( | |
| audio: np.ndarray, | |
| orig_sr: int, | |
| target_sr: int | |
| ) -> np.ndarray: | |
| """ | |
| Resample audio to target sample rate | |
| Args: | |
| audio: Audio waveform | |
| orig_sr: Original sample rate | |
| target_sr: Target sample rate | |
| Returns: | |
| Resampled audio | |
| """ | |
| if orig_sr == target_sr: | |
| return audio | |
| logger.info(f"Resampling from {orig_sr} to {target_sr}") | |
| return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) | |
| def concatenate_audio(*audio_arrays) -> np.ndarray: | |
| """ | |
| Concatenate multiple audio arrays | |
| Args: | |
| *audio_arrays: Variable number of audio arrays | |
| Returns: | |
| Concatenated audio array | |
| """ | |
| logger.info(f"Concatenating {len(audio_arrays)} audio segments") | |
| return np.concatenate(audio_arrays) | |
| def get_audio_duration(audio: np.ndarray, sr: int) -> float: | |
| """Get duration of audio in seconds""" | |
| return len(audio) / sr | |
| def validate_audio_file(file_path: str) -> bool: | |
| """ | |
| Validate if file is a supported audio format | |
| Args: | |
| file_path: Path to audio file | |
| Returns: | |
| True if valid, False otherwise | |
| """ | |
| ext = file_path.split('.')[-1].lower() | |
| is_valid = ext in AudioProcessor.SUPPORTED_FORMATS | |
| if not is_valid: | |
| logger.warning(f"Unsupported format: {ext}") | |
| return is_valid | |
| def create_temp_audio_file(suffix: str = '.wav') -> str: | |
| """ | |
| Create a temporary audio file | |
| Returns: | |
| Path to temporary file | |
| """ | |
| temp_file = tempfile.NamedTemporaryFile( | |
| suffix=suffix, | |
| delete=False | |
| ) | |
| logger.info(f"Created temporary file: {temp_file.name}") | |
| return temp_file.name | |
| def cleanup_temp_file(file_path: str): | |
| """ | |
| Delete temporary file safely | |
| Args: | |
| file_path: Path to file to delete | |
| """ | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| logger.info(f"Deleted temporary file: {file_path}") | |
| except Exception as e: | |
| logger.warning(f"Could not delete file {file_path}: {str(e)}") | |
| def normalize_audio(audio: np.ndarray, target_db: float = -20.0) -> np.ndarray: | |
| """ | |
| Normalize audio to target loudness | |
| Args: | |
| audio: Audio waveform | |
| target_db: Target loudness in dB | |
| Returns: | |
| Normalized audio | |
| """ | |
| # Calculate RMS | |
| rms = np.sqrt(np.mean(audio ** 2)) | |
| if rms == 0: | |
| return audio | |
| # Convert target db to linear scale | |
| target_linear = 10 ** (target_db / 20.0) | |
| # Scale audio | |
| normalized = audio * (target_linear / rms) | |
| # Clip to prevent clipping | |
| normalized = np.clip(normalized, -1.0, 1.0) | |
| logger.info(f"Audio normalized to {target_db} dB") | |
| return normalized | |