""" Audio I/O utilities: Read, write, and validate audio files. Handles m4a and wav formats with format validation and error handling. """ import logging from pathlib import Path from typing import Optional, Tuple import numpy as np logger = logging.getLogger(__name__) class AudioIOError(Exception): """Custom exception for audio I/O errors.""" pass def read_audio(file_path: str, target_sr: Optional[int] = None) -> Tuple[np.ndarray, int]: """ Read audio file and return waveform and sample rate. Supports m4a and wav formats. Automatically converts to mono if stereo. Args: file_path: Path to audio file target_sr: Target sample rate (resamples if different), None = keep original Returns: Tuple of (audio_array, sample_rate) - audio_array: 1D numpy array of audio samples (float32, mono) - sample_rate: Sample rate in Hz Raises: AudioIOError: If file cannot be read or format is invalid """ import subprocess import tempfile import soundfile as sf file_path = Path(file_path) if not file_path.exists(): raise AudioIOError(f"Audio file not found: {file_path}") try: # Try reading directly with soundfile audio, sr = sf.read(str(file_path), dtype="float32") except Exception as e: # If M4A/AAC format not recognized, convert to WAV using FFmpeg if file_path.suffix.lower() in [".m4a", ".aac", ".mp4"]: logger.debug(f"Converting {file_path.suffix} to WAV for reading...") # Create temporary WAV file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav: tmp_wav_path = tmp_wav.name try: # Convert M4A to WAV using FFmpeg target_rate = target_sr if target_sr else 44100 cmd = [ "ffmpeg", "-i", str(file_path), "-ar", str(target_rate), "-ac", "1", # Mono "-y", # Overwrite tmp_wav_path, ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) # Read the converted WAV file audio, sr = sf.read(tmp_wav_path, dtype="float32") logger.debug(f"Converted and read {file_path.name} via FFmpeg") finally: # Clean up temporary file if Path(tmp_wav_path).exists(): Path(tmp_wav_path).unlink() else: # Not an M4A file, re-raise the original error raise AudioIOError(f"Failed to read audio file {file_path}: {str(e)}") # Convert stereo to mono if needed (in case FFmpeg didn't do it) if audio.ndim > 1: audio = audio.mean(axis=1) # Resample if target sample rate specified and not already done if target_sr is not None and sr != target_sr: audio = resample_audio(audio, sr, target_sr) sr = target_sr logger.debug(f"Read audio: {file_path.name} ({len(audio) / sr:.1f}s at {sr}Hz)") return audio, sr def write_audio( file_path: str, audio: np.ndarray, sample_rate: int, format: Optional[str] = None ) -> None: """ Write audio array to file. Args: file_path: Output file path audio: Audio array (1D numpy array, float32) sample_rate: Sample rate in Hz format: Audio format ('wav', 'm4a', etc.), auto-detected from extension if None Raises: AudioIOError: If file cannot be written """ import subprocess import tempfile import soundfile as sf file_path = Path(file_path) # Create output directory if needed file_path.parent.mkdir(parents=True, exist_ok=True) # Ensure audio is 1D if audio.ndim > 1: audio = audio.squeeze() # Auto-detect format from extension if format is None: format = file_path.suffix.lstrip(".") try: # Check if M4A/AAC format (not supported by soundfile) if format.lower() in ["m4a", "aac", "mp4"]: logger.debug(f"Converting to {format.upper()} via FFmpeg...") # Write to temporary WAV file first with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav: tmp_wav_path = tmp_wav.name try: # Write WAV using soundfile sf.write(tmp_wav_path, audio, sample_rate, format="wav") # Convert WAV to M4A using FFmpeg # Clamp sample rate to M4A maximum (48kHz) output_sr = min(sample_rate, 48000) bitrate = "192k" # Good quality for voice cmd = [ "ffmpeg", "-i", tmp_wav_path, "-ar", str(output_sr), "-b:a", bitrate, "-c:a", "aac", "-y", # Overwrite str(file_path), ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) logger.debug( f"Wrote audio: {file_path.name} ({len(audio) / sample_rate:.1f}s at {output_sr}Hz, {bitrate})" ) finally: # Clean up temporary file if Path(tmp_wav_path).exists(): Path(tmp_wav_path).unlink() else: # Write directly with soundfile for WAV and other supported formats sf.write(str(file_path), audio, sample_rate, format=format) logger.debug( f"Wrote audio: {file_path.name} ({len(audio) / sample_rate:.1f}s at {sample_rate}Hz)" ) except subprocess.CalledProcessError as e: raise AudioIOError(f"FFmpeg conversion failed for {file_path}: {e.stderr}") except Exception as e: raise AudioIOError(f"Failed to write audio file {file_path}: {str(e)}") def validate_audio_file(file_path: str, min_duration: float = 0.1) -> Tuple[bool, Optional[str]]: """ Validate that file is a readable audio file with comprehensive checks. Args: file_path: Path to audio file min_duration: Minimum duration in seconds (default: 0.1) Returns: Tuple of (is_valid, error_message) - is_valid: True if file is valid audio - error_message: Description of validation failure, None if valid """ try: file_path = Path(file_path) # Check file exists if not file_path.exists(): return False, f"File not found: {file_path}" # Check file is not empty if file_path.stat().st_size == 0: return False, f"File is empty: {file_path}" # Check file extension valid_extensions = {".m4a", ".wav", ".mp3", ".flac", ".ogg", ".aac", ".mp4"} if file_path.suffix.lower() not in valid_extensions: return ( False, f"Unsupported format: {file_path.suffix}. Supported formats: {', '.join(valid_extensions)}", ) # Try to read file metadata import subprocess import soundfile as sf try: # For M4A/AAC, use ffprobe for metadata if file_path.suffix.lower() in [".m4a", ".aac", ".mp4"]: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration,bit_rate:stream=codec_name,sample_rate,channels", "-of", "json", str(file_path), ], capture_output=True, text=True, check=True, ) import json probe_data = json.loads(result.stdout) if "format" not in probe_data or "duration" not in probe_data["format"]: return False, f"Invalid audio file: Cannot read metadata" duration = float(probe_data["format"]["duration"]) if duration < min_duration: return False, f"Audio too short: {duration:.2f}s (minimum: {min_duration}s)" else: # For WAV and other formats, use soundfile info = sf.info(str(file_path)) # Check basic properties if info.samplerate <= 0: return False, f"Invalid sample rate: {info.samplerate}" if info.frames <= 0: return False, f"No audio frames in file" duration = info.frames / info.samplerate if duration < min_duration: return False, f"Audio too short: {duration:.2f}s (minimum: {min_duration}s)" except FileNotFoundError: return False, "FFmpeg/FFprobe not found. Please install FFmpeg for M4A support." except subprocess.CalledProcessError as e: return False, f"Cannot read audio metadata: {e.stderr}" except Exception as e: return False, f"Invalid audio file: {str(e)}" return True, None except Exception as e: return False, f"Validation error: {str(e)}" def get_audio_duration(file_path: str) -> float: """ Get duration of audio file in seconds. Args: file_path: Path to audio file Returns: Duration in seconds Raises: AudioIOError: If file cannot be read """ try: # For M4A/AAC files, use FFprobe since soundfile doesn't support them if Path(file_path).suffix.lower() in [".m4a", ".aac", ".mp4"]: import subprocess result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path), ], capture_output=True, text=True, check=True, ) return float(result.stdout.strip()) else: # For WAV and other formats, use soundfile import soundfile as sf info = sf.info(str(file_path)) return info.frames / info.samplerate except Exception as e: raise AudioIOError(f"Failed to get audio duration for {file_path}: {str(e)}") def get_audio_info(file_path: str) -> dict: """ Get detailed information about audio file. Args: file_path: Path to audio file Returns: Dictionary with keys: duration, sample_rate, channels, format, subtype Raises: AudioIOError: If file cannot be read """ try: import soundfile as sf info = sf.info(str(file_path)) return { "duration": info.frames / info.samplerate, "sample_rate": info.samplerate, "channels": info.channels, "format": info.format, "subtype": info.subtype, "frames": info.frames, } except Exception as e: raise AudioIOError(f"Failed to get audio info for {file_path}: {str(e)}") def resample_audio(audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: """ Resample audio to target sample rate. Args: audio: Audio array orig_sr: Original sample rate target_sr: Target sample rate Returns: Resampled audio array """ try: import librosa if orig_sr == target_sr: return audio resampled = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) return resampled except Exception as e: raise AudioIOError(f"Failed to resample audio: {str(e)}") def normalize_audio(audio: np.ndarray, target_db: float = -20.0) -> np.ndarray: """ Normalize audio to target dB level. Args: audio: Audio array target_db: Target level in dB (default: -20dB) Returns: Normalized audio array """ # Calculate current RMS rms = np.sqrt(np.mean(audio**2)) if rms == 0: return audio # Calculate target RMS from dB target_rms = 10 ** (target_db / 20) # Apply gain gain = target_rms / rms normalized = audio * gain # Prevent clipping max_val = np.abs(normalized).max() if max_val > 1.0: normalized = normalized / max_val * 0.99 return normalized def extract_segment( audio: np.ndarray, sample_rate: int, start_time: float, end_time: float ) -> np.ndarray: """ Extract segment from audio array. Args: audio: Audio array sample_rate: Sample rate in Hz start_time: Start time in seconds end_time: End time in seconds Returns: Audio segment array """ start_sample = int(start_time * sample_rate) end_sample = int(end_time * sample_rate) # Clamp to valid range start_sample = max(0, start_sample) end_sample = min(len(audio), end_sample) return audio[start_sample:end_sample] def split_audio_chunks( audio: np.ndarray, sample_rate: int, chunk_duration: float, overlap: float = 0.0 ) -> list: """ Split audio into chunks for processing. Args: audio: Audio array sample_rate: Sample rate in Hz chunk_duration: Chunk duration in seconds overlap: Overlap between chunks in seconds Returns: List of (chunk_audio, start_time, end_time) tuples """ chunk_samples = int(chunk_duration * sample_rate) overlap_samples = int(overlap * sample_rate) step_samples = chunk_samples - overlap_samples chunks = [] position = 0 while position < len(audio): chunk_end = min(position + chunk_samples, len(audio)) chunk = audio[position:chunk_end] start_time = position / sample_rate end_time = chunk_end / sample_rate chunks.append((chunk, start_time, end_time)) position += step_samples # Stop if we've reached the end if chunk_end >= len(audio): break return chunks # ===== M4A/WAV Conversion Utilities (T007-T008) ===== def convert_m4a_to_wav( input_path: str, output_path: Optional[str] = None, sample_rate: int = 16000 ) -> str: """ Convert M4A/AAC audio file to WAV format using FFmpeg. This is required for pyannote.audio processing which expects WAV input. Args: input_path: Path to input M4A/AAC file output_path: Path for output WAV file (auto-generated if None) sample_rate: Target sample rate in Hz (default: 16000 for pyannote) Returns: Path to converted WAV file Raises: AudioIOError: If conversion fails or FFmpeg is not available """ import subprocess from pathlib import Path input_path = Path(input_path) if not input_path.exists(): raise AudioIOError(f"Input file not found: {input_path}") # Auto-generate output path if not provided if output_path is None: output_path = input_path.with_suffix(".wav") else: output_path = Path(output_path) # Create output directory if needed output_path.parent.mkdir(parents=True, exist_ok=True) try: # Run FFmpeg conversion cmd = [ "ffmpeg", "-i", str(input_path), "-ar", str(sample_rate), # Resample to target rate "-ac", "1", # Convert to mono "-y", # Overwrite output str(output_path), ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) logger.info(f"Converted {input_path.name} to WAV at {sample_rate}Hz") return str(output_path) except FileNotFoundError: raise AudioIOError( "FFmpeg not found. Please install FFmpeg: https://ffmpeg.org/download.html" ) except subprocess.CalledProcessError as e: raise AudioIOError(f"FFmpeg conversion failed: {e.stderr}") def convert_wav_to_m4a( input_path: str, output_path: str, sample_rate: int = 44100, bitrate: str = "192k" ) -> str: """ Convert WAV audio file to M4A/AAC format using FFmpeg. Used for exporting final processed audio in M4A format. Args: input_path: Path to input WAV file output_path: Path for output M4A file sample_rate: Target sample rate in Hz (default: 44100, max 48000 for M4A) bitrate: Target bitrate (default: "192k") Returns: Path to converted M4A file Raises: AudioIOError: If conversion fails or FFmpeg is not available """ import subprocess from pathlib import Path input_path = Path(input_path) output_path = Path(output_path) if not input_path.exists(): raise AudioIOError(f"Input file not found: {input_path}") # Validate sample rate for M4A (max 48kHz) if sample_rate > 48000: logger.warning(f"Sample rate {sample_rate}Hz exceeds M4A limit, using 48000Hz") sample_rate = 48000 # Create output directory if needed output_path.parent.mkdir(parents=True, exist_ok=True) try: # Run FFmpeg conversion cmd = [ "ffmpeg", "-i", str(input_path), "-ar", str(sample_rate), # Resample to target rate "-b:a", bitrate, # Set bitrate "-c:a", "aac", # Use AAC codec "-y", # Overwrite output str(output_path), ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) logger.info(f"Converted {input_path.name} to M4A at {sample_rate}Hz, {bitrate}") return str(output_path) except FileNotFoundError: raise AudioIOError( "FFmpeg not found. Please install FFmpeg: https://ffmpeg.org/download.html" ) except subprocess.CalledProcessError as e: raise AudioIOError(f"FFmpeg conversion failed: {e.stderr}") # ===== Audio Quality Validation (T009) ===== def validate_audio_quality( audio: np.ndarray, sample_rate: int, file_path: Optional[str] = None ) -> dict: """ Validate audio quality and return metrics. Checks for issues like: - Signal-to-Noise Ratio (SNR) - Clipping/distortion - Duration requirements - RMS energy levels Args: audio: Audio array sample_rate: Sample rate in Hz file_path: Optional file path for logging Returns: Dictionary with quality metrics and validation results: { 'snr_db': float, # Signal-to-noise ratio in dB 'is_clipped': bool, # True if audio has clipping 'clipping_ratio': float, # Percentage of clipped samples 'rms_energy': float, # RMS energy level 'is_too_quiet': bool, # True if audio is too quiet 'duration': float, # Duration in seconds 'is_valid': bool, # Overall validation result 'warnings': list, # List of warning messages } """ metrics = {"duration": len(audio) / sample_rate, "warnings": []} # Calculate SNR estimate noise_floor = np.percentile(np.abs(audio), 10) signal_peak = np.percentile(np.abs(audio), 90) snr_db = 20 * np.log10(signal_peak / (noise_floor + 1e-10)) metrics["snr_db"] = float(snr_db) if snr_db < 15: metrics["warnings"].append(f"Low SNR ({snr_db:.1f} dB < 15 dB)") # Check for clipping clipping_threshold = 0.99 clipped_samples = np.sum(np.abs(audio) > clipping_threshold) clipping_ratio = clipped_samples / len(audio) metrics["is_clipped"] = clipping_ratio > 0.01 metrics["clipping_ratio"] = float(clipping_ratio) if metrics["is_clipped"]: metrics["warnings"].append(f"Audio has clipping ({clipping_ratio * 100:.1f}% of samples)") # Check RMS energy rms_energy = np.sqrt(np.mean(audio**2)) metrics["rms_energy"] = float(rms_energy) metrics["is_too_quiet"] = rms_energy < 0.01 if metrics["is_too_quiet"]: metrics["warnings"].append(f"Audio is too quiet (RMS: {rms_energy:.4f})") # Check duration if metrics["duration"] < 1.0: metrics["warnings"].append(f"Audio is very short ({metrics['duration']:.1f}s)") # Overall validation metrics["is_valid"] = ( snr_db >= 10 # Minimum acceptable SNR and not metrics["is_clipped"] and not metrics["is_too_quiet"] and metrics["duration"] > 0.5 ) # Log results file_desc = f" for {file_path}" if file_path else "" if metrics["is_valid"]: logger.debug(f"Audio quality validation passed{file_desc}") else: logger.warning( f"Audio quality validation failed{file_desc}: " + ", ".join(metrics["warnings"]) ) return metrics