Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Audio I/O utilities: Read, write, and validate audio files. | |
| Handles m4a and wav formats with format validation and error handling. | |
| """ | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Tuple | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| class AudioIOError(Exception): | |
| """Custom exception for audio I/O errors.""" | |
| pass | |
| def read_audio(file_path: str, target_sr: Optional[int] = None) -> Tuple[np.ndarray, int]: | |
| """ | |
| Read audio file and return waveform and sample rate. | |
| Supports m4a and wav formats. Automatically converts to mono if stereo. | |
| Args: | |
| file_path: Path to audio file | |
| target_sr: Target sample rate (resamples if different), None = keep original | |
| Returns: | |
| Tuple of (audio_array, sample_rate) | |
| - audio_array: 1D numpy array of audio samples (float32, mono) | |
| - sample_rate: Sample rate in Hz | |
| Raises: | |
| AudioIOError: If file cannot be read or format is invalid | |
| """ | |
| import subprocess | |
| import tempfile | |
| import soundfile as sf | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| raise AudioIOError(f"Audio file not found: {file_path}") | |
| try: | |
| # Try reading directly with soundfile | |
| audio, sr = sf.read(str(file_path), dtype="float32") | |
| except Exception as e: | |
| # If M4A/AAC format not recognized, convert to WAV using FFmpeg | |
| if file_path.suffix.lower() in [".m4a", ".aac", ".mp4"]: | |
| logger.debug(f"Converting {file_path.suffix} to WAV for reading...") | |
| # Create temporary WAV file | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav: | |
| tmp_wav_path = tmp_wav.name | |
| try: | |
| # Convert M4A to WAV using FFmpeg | |
| target_rate = target_sr if target_sr else 44100 | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", | |
| str(file_path), | |
| "-ar", | |
| str(target_rate), | |
| "-ac", | |
| "1", # Mono | |
| "-y", # Overwrite | |
| tmp_wav_path, | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| # Read the converted WAV file | |
| audio, sr = sf.read(tmp_wav_path, dtype="float32") | |
| logger.debug(f"Converted and read {file_path.name} via FFmpeg") | |
| finally: | |
| # Clean up temporary file | |
| if Path(tmp_wav_path).exists(): | |
| Path(tmp_wav_path).unlink() | |
| else: | |
| # Not an M4A file, re-raise the original error | |
| raise AudioIOError(f"Failed to read audio file {file_path}: {str(e)}") | |
| # Convert stereo to mono if needed (in case FFmpeg didn't do it) | |
| if audio.ndim > 1: | |
| audio = audio.mean(axis=1) | |
| # Resample if target sample rate specified and not already done | |
| if target_sr is not None and sr != target_sr: | |
| audio = resample_audio(audio, sr, target_sr) | |
| sr = target_sr | |
| logger.debug(f"Read audio: {file_path.name} ({len(audio) / sr:.1f}s at {sr}Hz)") | |
| return audio, sr | |
| def write_audio( | |
| file_path: str, audio: np.ndarray, sample_rate: int, format: Optional[str] = None | |
| ) -> None: | |
| """ | |
| Write audio array to file. | |
| Args: | |
| file_path: Output file path | |
| audio: Audio array (1D numpy array, float32) | |
| sample_rate: Sample rate in Hz | |
| format: Audio format ('wav', 'm4a', etc.), auto-detected from extension if None | |
| Raises: | |
| AudioIOError: If file cannot be written | |
| """ | |
| import subprocess | |
| import tempfile | |
| import soundfile as sf | |
| file_path = Path(file_path) | |
| # Create output directory if needed | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| # Ensure audio is 1D | |
| if audio.ndim > 1: | |
| audio = audio.squeeze() | |
| # Auto-detect format from extension | |
| if format is None: | |
| format = file_path.suffix.lstrip(".") | |
| try: | |
| # Check if M4A/AAC format (not supported by soundfile) | |
| if format.lower() in ["m4a", "aac", "mp4"]: | |
| logger.debug(f"Converting to {format.upper()} via FFmpeg...") | |
| # Write to temporary WAV file first | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav: | |
| tmp_wav_path = tmp_wav.name | |
| try: | |
| # Write WAV using soundfile | |
| sf.write(tmp_wav_path, audio, sample_rate, format="wav") | |
| # Convert WAV to M4A using FFmpeg | |
| # Clamp sample rate to M4A maximum (48kHz) | |
| output_sr = min(sample_rate, 48000) | |
| bitrate = "192k" # Good quality for voice | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", | |
| tmp_wav_path, | |
| "-ar", | |
| str(output_sr), | |
| "-b:a", | |
| bitrate, | |
| "-c:a", | |
| "aac", | |
| "-y", # Overwrite | |
| str(file_path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| logger.debug( | |
| f"Wrote audio: {file_path.name} ({len(audio) / sample_rate:.1f}s at {output_sr}Hz, {bitrate})" | |
| ) | |
| finally: | |
| # Clean up temporary file | |
| if Path(tmp_wav_path).exists(): | |
| Path(tmp_wav_path).unlink() | |
| else: | |
| # Write directly with soundfile for WAV and other supported formats | |
| sf.write(str(file_path), audio, sample_rate, format=format) | |
| logger.debug( | |
| f"Wrote audio: {file_path.name} ({len(audio) / sample_rate:.1f}s at {sample_rate}Hz)" | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| raise AudioIOError(f"FFmpeg conversion failed for {file_path}: {e.stderr}") | |
| except Exception as e: | |
| raise AudioIOError(f"Failed to write audio file {file_path}: {str(e)}") | |
| def validate_audio_file(file_path: str, min_duration: float = 0.1) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Validate that file is a readable audio file with comprehensive checks. | |
| Args: | |
| file_path: Path to audio file | |
| min_duration: Minimum duration in seconds (default: 0.1) | |
| Returns: | |
| Tuple of (is_valid, error_message) | |
| - is_valid: True if file is valid audio | |
| - error_message: Description of validation failure, None if valid | |
| """ | |
| try: | |
| file_path = Path(file_path) | |
| # Check file exists | |
| if not file_path.exists(): | |
| return False, f"File not found: {file_path}" | |
| # Check file is not empty | |
| if file_path.stat().st_size == 0: | |
| return False, f"File is empty: {file_path}" | |
| # Check file extension | |
| valid_extensions = {".m4a", ".wav", ".mp3", ".flac", ".ogg", ".aac", ".mp4"} | |
| if file_path.suffix.lower() not in valid_extensions: | |
| return ( | |
| False, | |
| f"Unsupported format: {file_path.suffix}. Supported formats: {', '.join(valid_extensions)}", | |
| ) | |
| # Try to read file metadata | |
| import subprocess | |
| import soundfile as sf | |
| try: | |
| # For M4A/AAC, use ffprobe for metadata | |
| if file_path.suffix.lower() in [".m4a", ".aac", ".mp4"]: | |
| result = subprocess.run( | |
| [ | |
| "ffprobe", | |
| "-v", | |
| "error", | |
| "-show_entries", | |
| "format=duration,bit_rate:stream=codec_name,sample_rate,channels", | |
| "-of", | |
| "json", | |
| str(file_path), | |
| ], | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| ) | |
| import json | |
| probe_data = json.loads(result.stdout) | |
| if "format" not in probe_data or "duration" not in probe_data["format"]: | |
| return False, f"Invalid audio file: Cannot read metadata" | |
| duration = float(probe_data["format"]["duration"]) | |
| if duration < min_duration: | |
| return False, f"Audio too short: {duration:.2f}s (minimum: {min_duration}s)" | |
| else: | |
| # For WAV and other formats, use soundfile | |
| info = sf.info(str(file_path)) | |
| # Check basic properties | |
| if info.samplerate <= 0: | |
| return False, f"Invalid sample rate: {info.samplerate}" | |
| if info.frames <= 0: | |
| return False, f"No audio frames in file" | |
| duration = info.frames / info.samplerate | |
| if duration < min_duration: | |
| return False, f"Audio too short: {duration:.2f}s (minimum: {min_duration}s)" | |
| except FileNotFoundError: | |
| return False, "FFmpeg/FFprobe not found. Please install FFmpeg for M4A support." | |
| except subprocess.CalledProcessError as e: | |
| return False, f"Cannot read audio metadata: {e.stderr}" | |
| except Exception as e: | |
| return False, f"Invalid audio file: {str(e)}" | |
| return True, None | |
| except Exception as e: | |
| return False, f"Validation error: {str(e)}" | |
| def get_audio_duration(file_path: str) -> float: | |
| """ | |
| Get duration of audio file in seconds. | |
| Args: | |
| file_path: Path to audio file | |
| Returns: | |
| Duration in seconds | |
| Raises: | |
| AudioIOError: If file cannot be read | |
| """ | |
| try: | |
| # For M4A/AAC files, use FFprobe since soundfile doesn't support them | |
| if Path(file_path).suffix.lower() in [".m4a", ".aac", ".mp4"]: | |
| import subprocess | |
| result = subprocess.run( | |
| [ | |
| "ffprobe", | |
| "-v", | |
| "error", | |
| "-show_entries", | |
| "format=duration", | |
| "-of", | |
| "default=noprint_wrappers=1:nokey=1", | |
| str(file_path), | |
| ], | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| ) | |
| return float(result.stdout.strip()) | |
| else: | |
| # For WAV and other formats, use soundfile | |
| import soundfile as sf | |
| info = sf.info(str(file_path)) | |
| return info.frames / info.samplerate | |
| except Exception as e: | |
| raise AudioIOError(f"Failed to get audio duration for {file_path}: {str(e)}") | |
| def get_audio_info(file_path: str) -> dict: | |
| """ | |
| Get detailed information about audio file. | |
| Args: | |
| file_path: Path to audio file | |
| Returns: | |
| Dictionary with keys: duration, sample_rate, channels, format, subtype | |
| Raises: | |
| AudioIOError: If file cannot be read | |
| """ | |
| try: | |
| import soundfile as sf | |
| info = sf.info(str(file_path)) | |
| return { | |
| "duration": info.frames / info.samplerate, | |
| "sample_rate": info.samplerate, | |
| "channels": info.channels, | |
| "format": info.format, | |
| "subtype": info.subtype, | |
| "frames": info.frames, | |
| } | |
| except Exception as e: | |
| raise AudioIOError(f"Failed to get audio info for {file_path}: {str(e)}") | |
| def resample_audio(audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: | |
| """ | |
| Resample audio to target sample rate. | |
| Args: | |
| audio: Audio array | |
| orig_sr: Original sample rate | |
| target_sr: Target sample rate | |
| Returns: | |
| Resampled audio array | |
| """ | |
| try: | |
| import librosa | |
| if orig_sr == target_sr: | |
| return audio | |
| resampled = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) | |
| return resampled | |
| except Exception as e: | |
| raise AudioIOError(f"Failed to resample audio: {str(e)}") | |
| def normalize_audio(audio: np.ndarray, target_db: float = -20.0) -> np.ndarray: | |
| """ | |
| Normalize audio to target dB level. | |
| Args: | |
| audio: Audio array | |
| target_db: Target level in dB (default: -20dB) | |
| Returns: | |
| Normalized audio array | |
| """ | |
| # Calculate current RMS | |
| rms = np.sqrt(np.mean(audio**2)) | |
| if rms == 0: | |
| return audio | |
| # Calculate target RMS from dB | |
| target_rms = 10 ** (target_db / 20) | |
| # Apply gain | |
| gain = target_rms / rms | |
| normalized = audio * gain | |
| # Prevent clipping | |
| max_val = np.abs(normalized).max() | |
| if max_val > 1.0: | |
| normalized = normalized / max_val * 0.99 | |
| return normalized | |
| def extract_segment( | |
| audio: np.ndarray, sample_rate: int, start_time: float, end_time: float | |
| ) -> np.ndarray: | |
| """ | |
| Extract segment from audio array. | |
| Args: | |
| audio: Audio array | |
| sample_rate: Sample rate in Hz | |
| start_time: Start time in seconds | |
| end_time: End time in seconds | |
| Returns: | |
| Audio segment array | |
| """ | |
| start_sample = int(start_time * sample_rate) | |
| end_sample = int(end_time * sample_rate) | |
| # Clamp to valid range | |
| start_sample = max(0, start_sample) | |
| end_sample = min(len(audio), end_sample) | |
| return audio[start_sample:end_sample] | |
| def split_audio_chunks( | |
| audio: np.ndarray, sample_rate: int, chunk_duration: float, overlap: float = 0.0 | |
| ) -> list: | |
| """ | |
| Split audio into chunks for processing. | |
| Args: | |
| audio: Audio array | |
| sample_rate: Sample rate in Hz | |
| chunk_duration: Chunk duration in seconds | |
| overlap: Overlap between chunks in seconds | |
| Returns: | |
| List of (chunk_audio, start_time, end_time) tuples | |
| """ | |
| chunk_samples = int(chunk_duration * sample_rate) | |
| overlap_samples = int(overlap * sample_rate) | |
| step_samples = chunk_samples - overlap_samples | |
| chunks = [] | |
| position = 0 | |
| while position < len(audio): | |
| chunk_end = min(position + chunk_samples, len(audio)) | |
| chunk = audio[position:chunk_end] | |
| start_time = position / sample_rate | |
| end_time = chunk_end / sample_rate | |
| chunks.append((chunk, start_time, end_time)) | |
| position += step_samples | |
| # Stop if we've reached the end | |
| if chunk_end >= len(audio): | |
| break | |
| return chunks | |
| # ===== M4A/WAV Conversion Utilities (T007-T008) ===== | |
| def convert_m4a_to_wav( | |
| input_path: str, output_path: Optional[str] = None, sample_rate: int = 16000 | |
| ) -> str: | |
| """ | |
| Convert M4A/AAC audio file to WAV format using FFmpeg. | |
| This is required for pyannote.audio processing which expects WAV input. | |
| Args: | |
| input_path: Path to input M4A/AAC file | |
| output_path: Path for output WAV file (auto-generated if None) | |
| sample_rate: Target sample rate in Hz (default: 16000 for pyannote) | |
| Returns: | |
| Path to converted WAV file | |
| Raises: | |
| AudioIOError: If conversion fails or FFmpeg is not available | |
| """ | |
| import subprocess | |
| from pathlib import Path | |
| input_path = Path(input_path) | |
| if not input_path.exists(): | |
| raise AudioIOError(f"Input file not found: {input_path}") | |
| # Auto-generate output path if not provided | |
| if output_path is None: | |
| output_path = input_path.with_suffix(".wav") | |
| else: | |
| output_path = Path(output_path) | |
| # Create output directory if needed | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| # Run FFmpeg conversion | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", | |
| str(input_path), | |
| "-ar", | |
| str(sample_rate), # Resample to target rate | |
| "-ac", | |
| "1", # Convert to mono | |
| "-y", # Overwrite output | |
| str(output_path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| logger.info(f"Converted {input_path.name} to WAV at {sample_rate}Hz") | |
| return str(output_path) | |
| except FileNotFoundError: | |
| raise AudioIOError( | |
| "FFmpeg not found. Please install FFmpeg: https://ffmpeg.org/download.html" | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| raise AudioIOError(f"FFmpeg conversion failed: {e.stderr}") | |
| def convert_wav_to_m4a( | |
| input_path: str, output_path: str, sample_rate: int = 44100, bitrate: str = "192k" | |
| ) -> str: | |
| """ | |
| Convert WAV audio file to M4A/AAC format using FFmpeg. | |
| Used for exporting final processed audio in M4A format. | |
| Args: | |
| input_path: Path to input WAV file | |
| output_path: Path for output M4A file | |
| sample_rate: Target sample rate in Hz (default: 44100, max 48000 for M4A) | |
| bitrate: Target bitrate (default: "192k") | |
| Returns: | |
| Path to converted M4A file | |
| Raises: | |
| AudioIOError: If conversion fails or FFmpeg is not available | |
| """ | |
| import subprocess | |
| from pathlib import Path | |
| input_path = Path(input_path) | |
| output_path = Path(output_path) | |
| if not input_path.exists(): | |
| raise AudioIOError(f"Input file not found: {input_path}") | |
| # Validate sample rate for M4A (max 48kHz) | |
| if sample_rate > 48000: | |
| logger.warning(f"Sample rate {sample_rate}Hz exceeds M4A limit, using 48000Hz") | |
| sample_rate = 48000 | |
| # Create output directory if needed | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| # Run FFmpeg conversion | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", | |
| str(input_path), | |
| "-ar", | |
| str(sample_rate), # Resample to target rate | |
| "-b:a", | |
| bitrate, # Set bitrate | |
| "-c:a", | |
| "aac", # Use AAC codec | |
| "-y", # Overwrite output | |
| str(output_path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| logger.info(f"Converted {input_path.name} to M4A at {sample_rate}Hz, {bitrate}") | |
| return str(output_path) | |
| except FileNotFoundError: | |
| raise AudioIOError( | |
| "FFmpeg not found. Please install FFmpeg: https://ffmpeg.org/download.html" | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| raise AudioIOError(f"FFmpeg conversion failed: {e.stderr}") | |
| # ===== Audio Quality Validation (T009) ===== | |
| def validate_audio_quality( | |
| audio: np.ndarray, sample_rate: int, file_path: Optional[str] = None | |
| ) -> dict: | |
| """ | |
| Validate audio quality and return metrics. | |
| Checks for issues like: | |
| - Signal-to-Noise Ratio (SNR) | |
| - Clipping/distortion | |
| - Duration requirements | |
| - RMS energy levels | |
| Args: | |
| audio: Audio array | |
| sample_rate: Sample rate in Hz | |
| file_path: Optional file path for logging | |
| Returns: | |
| Dictionary with quality metrics and validation results: | |
| { | |
| 'snr_db': float, # Signal-to-noise ratio in dB | |
| 'is_clipped': bool, # True if audio has clipping | |
| 'clipping_ratio': float, # Percentage of clipped samples | |
| 'rms_energy': float, # RMS energy level | |
| 'is_too_quiet': bool, # True if audio is too quiet | |
| 'duration': float, # Duration in seconds | |
| 'is_valid': bool, # Overall validation result | |
| 'warnings': list, # List of warning messages | |
| } | |
| """ | |
| metrics = {"duration": len(audio) / sample_rate, "warnings": []} | |
| # Calculate SNR estimate | |
| noise_floor = np.percentile(np.abs(audio), 10) | |
| signal_peak = np.percentile(np.abs(audio), 90) | |
| snr_db = 20 * np.log10(signal_peak / (noise_floor + 1e-10)) | |
| metrics["snr_db"] = float(snr_db) | |
| if snr_db < 15: | |
| metrics["warnings"].append(f"Low SNR ({snr_db:.1f} dB < 15 dB)") | |
| # Check for clipping | |
| clipping_threshold = 0.99 | |
| clipped_samples = np.sum(np.abs(audio) > clipping_threshold) | |
| clipping_ratio = clipped_samples / len(audio) | |
| metrics["is_clipped"] = clipping_ratio > 0.01 | |
| metrics["clipping_ratio"] = float(clipping_ratio) | |
| if metrics["is_clipped"]: | |
| metrics["warnings"].append(f"Audio has clipping ({clipping_ratio * 100:.1f}% of samples)") | |
| # Check RMS energy | |
| rms_energy = np.sqrt(np.mean(audio**2)) | |
| metrics["rms_energy"] = float(rms_energy) | |
| metrics["is_too_quiet"] = rms_energy < 0.01 | |
| if metrics["is_too_quiet"]: | |
| metrics["warnings"].append(f"Audio is too quiet (RMS: {rms_energy:.4f})") | |
| # Check duration | |
| if metrics["duration"] < 1.0: | |
| metrics["warnings"].append(f"Audio is very short ({metrics['duration']:.1f}s)") | |
| # Overall validation | |
| metrics["is_valid"] = ( | |
| snr_db >= 10 # Minimum acceptable SNR | |
| and not metrics["is_clipped"] | |
| and not metrics["is_too_quiet"] | |
| and metrics["duration"] > 0.5 | |
| ) | |
| # Log results | |
| file_desc = f" for {file_path}" if file_path else "" | |
| if metrics["is_valid"]: | |
| logger.debug(f"Audio quality validation passed{file_desc}") | |
| else: | |
| logger.warning( | |
| f"Audio quality validation failed{file_desc}: " + ", ".join(metrics["warnings"]) | |
| ) | |
| return metrics | |