voice-tools / src /lib /audio_io.py
jcudit's picture
jcudit HF Staff
fix: also correct lib/ in gitignore to only exclude root-level, add src/lib package
3ff2f18
"""
Audio I/O utilities: Read, write, and validate audio files.
Handles m4a and wav formats with format validation and error handling.
"""
import logging
from pathlib import Path
from typing import Optional, Tuple
import numpy as np
logger = logging.getLogger(__name__)
class AudioIOError(Exception):
"""Custom exception for audio I/O errors."""
pass
def read_audio(file_path: str, target_sr: Optional[int] = None) -> Tuple[np.ndarray, int]:
"""
Read audio file and return waveform and sample rate.
Supports m4a and wav formats. Automatically converts to mono if stereo.
Args:
file_path: Path to audio file
target_sr: Target sample rate (resamples if different), None = keep original
Returns:
Tuple of (audio_array, sample_rate)
- audio_array: 1D numpy array of audio samples (float32, mono)
- sample_rate: Sample rate in Hz
Raises:
AudioIOError: If file cannot be read or format is invalid
"""
import subprocess
import tempfile
import soundfile as sf
file_path = Path(file_path)
if not file_path.exists():
raise AudioIOError(f"Audio file not found: {file_path}")
try:
# Try reading directly with soundfile
audio, sr = sf.read(str(file_path), dtype="float32")
except Exception as e:
# If M4A/AAC format not recognized, convert to WAV using FFmpeg
if file_path.suffix.lower() in [".m4a", ".aac", ".mp4"]:
logger.debug(f"Converting {file_path.suffix} to WAV for reading...")
# Create temporary WAV file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav:
tmp_wav_path = tmp_wav.name
try:
# Convert M4A to WAV using FFmpeg
target_rate = target_sr if target_sr else 44100
cmd = [
"ffmpeg",
"-i",
str(file_path),
"-ar",
str(target_rate),
"-ac",
"1", # Mono
"-y", # Overwrite
tmp_wav_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Read the converted WAV file
audio, sr = sf.read(tmp_wav_path, dtype="float32")
logger.debug(f"Converted and read {file_path.name} via FFmpeg")
finally:
# Clean up temporary file
if Path(tmp_wav_path).exists():
Path(tmp_wav_path).unlink()
else:
# Not an M4A file, re-raise the original error
raise AudioIOError(f"Failed to read audio file {file_path}: {str(e)}")
# Convert stereo to mono if needed (in case FFmpeg didn't do it)
if audio.ndim > 1:
audio = audio.mean(axis=1)
# Resample if target sample rate specified and not already done
if target_sr is not None and sr != target_sr:
audio = resample_audio(audio, sr, target_sr)
sr = target_sr
logger.debug(f"Read audio: {file_path.name} ({len(audio) / sr:.1f}s at {sr}Hz)")
return audio, sr
def write_audio(
file_path: str, audio: np.ndarray, sample_rate: int, format: Optional[str] = None
) -> None:
"""
Write audio array to file.
Args:
file_path: Output file path
audio: Audio array (1D numpy array, float32)
sample_rate: Sample rate in Hz
format: Audio format ('wav', 'm4a', etc.), auto-detected from extension if None
Raises:
AudioIOError: If file cannot be written
"""
import subprocess
import tempfile
import soundfile as sf
file_path = Path(file_path)
# Create output directory if needed
file_path.parent.mkdir(parents=True, exist_ok=True)
# Ensure audio is 1D
if audio.ndim > 1:
audio = audio.squeeze()
# Auto-detect format from extension
if format is None:
format = file_path.suffix.lstrip(".")
try:
# Check if M4A/AAC format (not supported by soundfile)
if format.lower() in ["m4a", "aac", "mp4"]:
logger.debug(f"Converting to {format.upper()} via FFmpeg...")
# Write to temporary WAV file first
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav:
tmp_wav_path = tmp_wav.name
try:
# Write WAV using soundfile
sf.write(tmp_wav_path, audio, sample_rate, format="wav")
# Convert WAV to M4A using FFmpeg
# Clamp sample rate to M4A maximum (48kHz)
output_sr = min(sample_rate, 48000)
bitrate = "192k" # Good quality for voice
cmd = [
"ffmpeg",
"-i",
tmp_wav_path,
"-ar",
str(output_sr),
"-b:a",
bitrate,
"-c:a",
"aac",
"-y", # Overwrite
str(file_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.debug(
f"Wrote audio: {file_path.name} ({len(audio) / sample_rate:.1f}s at {output_sr}Hz, {bitrate})"
)
finally:
# Clean up temporary file
if Path(tmp_wav_path).exists():
Path(tmp_wav_path).unlink()
else:
# Write directly with soundfile for WAV and other supported formats
sf.write(str(file_path), audio, sample_rate, format=format)
logger.debug(
f"Wrote audio: {file_path.name} ({len(audio) / sample_rate:.1f}s at {sample_rate}Hz)"
)
except subprocess.CalledProcessError as e:
raise AudioIOError(f"FFmpeg conversion failed for {file_path}: {e.stderr}")
except Exception as e:
raise AudioIOError(f"Failed to write audio file {file_path}: {str(e)}")
def validate_audio_file(file_path: str, min_duration: float = 0.1) -> Tuple[bool, Optional[str]]:
"""
Validate that file is a readable audio file with comprehensive checks.
Args:
file_path: Path to audio file
min_duration: Minimum duration in seconds (default: 0.1)
Returns:
Tuple of (is_valid, error_message)
- is_valid: True if file is valid audio
- error_message: Description of validation failure, None if valid
"""
try:
file_path = Path(file_path)
# Check file exists
if not file_path.exists():
return False, f"File not found: {file_path}"
# Check file is not empty
if file_path.stat().st_size == 0:
return False, f"File is empty: {file_path}"
# Check file extension
valid_extensions = {".m4a", ".wav", ".mp3", ".flac", ".ogg", ".aac", ".mp4"}
if file_path.suffix.lower() not in valid_extensions:
return (
False,
f"Unsupported format: {file_path.suffix}. Supported formats: {', '.join(valid_extensions)}",
)
# Try to read file metadata
import subprocess
import soundfile as sf
try:
# For M4A/AAC, use ffprobe for metadata
if file_path.suffix.lower() in [".m4a", ".aac", ".mp4"]:
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration,bit_rate:stream=codec_name,sample_rate,channels",
"-of",
"json",
str(file_path),
],
capture_output=True,
text=True,
check=True,
)
import json
probe_data = json.loads(result.stdout)
if "format" not in probe_data or "duration" not in probe_data["format"]:
return False, f"Invalid audio file: Cannot read metadata"
duration = float(probe_data["format"]["duration"])
if duration < min_duration:
return False, f"Audio too short: {duration:.2f}s (minimum: {min_duration}s)"
else:
# For WAV and other formats, use soundfile
info = sf.info(str(file_path))
# Check basic properties
if info.samplerate <= 0:
return False, f"Invalid sample rate: {info.samplerate}"
if info.frames <= 0:
return False, f"No audio frames in file"
duration = info.frames / info.samplerate
if duration < min_duration:
return False, f"Audio too short: {duration:.2f}s (minimum: {min_duration}s)"
except FileNotFoundError:
return False, "FFmpeg/FFprobe not found. Please install FFmpeg for M4A support."
except subprocess.CalledProcessError as e:
return False, f"Cannot read audio metadata: {e.stderr}"
except Exception as e:
return False, f"Invalid audio file: {str(e)}"
return True, None
except Exception as e:
return False, f"Validation error: {str(e)}"
def get_audio_duration(file_path: str) -> float:
"""
Get duration of audio file in seconds.
Args:
file_path: Path to audio file
Returns:
Duration in seconds
Raises:
AudioIOError: If file cannot be read
"""
try:
# For M4A/AAC files, use FFprobe since soundfile doesn't support them
if Path(file_path).suffix.lower() in [".m4a", ".aac", ".mp4"]:
import subprocess
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(file_path),
],
capture_output=True,
text=True,
check=True,
)
return float(result.stdout.strip())
else:
# For WAV and other formats, use soundfile
import soundfile as sf
info = sf.info(str(file_path))
return info.frames / info.samplerate
except Exception as e:
raise AudioIOError(f"Failed to get audio duration for {file_path}: {str(e)}")
def get_audio_info(file_path: str) -> dict:
"""
Get detailed information about audio file.
Args:
file_path: Path to audio file
Returns:
Dictionary with keys: duration, sample_rate, channels, format, subtype
Raises:
AudioIOError: If file cannot be read
"""
try:
import soundfile as sf
info = sf.info(str(file_path))
return {
"duration": info.frames / info.samplerate,
"sample_rate": info.samplerate,
"channels": info.channels,
"format": info.format,
"subtype": info.subtype,
"frames": info.frames,
}
except Exception as e:
raise AudioIOError(f"Failed to get audio info for {file_path}: {str(e)}")
def resample_audio(audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray:
"""
Resample audio to target sample rate.
Args:
audio: Audio array
orig_sr: Original sample rate
target_sr: Target sample rate
Returns:
Resampled audio array
"""
try:
import librosa
if orig_sr == target_sr:
return audio
resampled = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
return resampled
except Exception as e:
raise AudioIOError(f"Failed to resample audio: {str(e)}")
def normalize_audio(audio: np.ndarray, target_db: float = -20.0) -> np.ndarray:
"""
Normalize audio to target dB level.
Args:
audio: Audio array
target_db: Target level in dB (default: -20dB)
Returns:
Normalized audio array
"""
# Calculate current RMS
rms = np.sqrt(np.mean(audio**2))
if rms == 0:
return audio
# Calculate target RMS from dB
target_rms = 10 ** (target_db / 20)
# Apply gain
gain = target_rms / rms
normalized = audio * gain
# Prevent clipping
max_val = np.abs(normalized).max()
if max_val > 1.0:
normalized = normalized / max_val * 0.99
return normalized
def extract_segment(
audio: np.ndarray, sample_rate: int, start_time: float, end_time: float
) -> np.ndarray:
"""
Extract segment from audio array.
Args:
audio: Audio array
sample_rate: Sample rate in Hz
start_time: Start time in seconds
end_time: End time in seconds
Returns:
Audio segment array
"""
start_sample = int(start_time * sample_rate)
end_sample = int(end_time * sample_rate)
# Clamp to valid range
start_sample = max(0, start_sample)
end_sample = min(len(audio), end_sample)
return audio[start_sample:end_sample]
def split_audio_chunks(
audio: np.ndarray, sample_rate: int, chunk_duration: float, overlap: float = 0.0
) -> list:
"""
Split audio into chunks for processing.
Args:
audio: Audio array
sample_rate: Sample rate in Hz
chunk_duration: Chunk duration in seconds
overlap: Overlap between chunks in seconds
Returns:
List of (chunk_audio, start_time, end_time) tuples
"""
chunk_samples = int(chunk_duration * sample_rate)
overlap_samples = int(overlap * sample_rate)
step_samples = chunk_samples - overlap_samples
chunks = []
position = 0
while position < len(audio):
chunk_end = min(position + chunk_samples, len(audio))
chunk = audio[position:chunk_end]
start_time = position / sample_rate
end_time = chunk_end / sample_rate
chunks.append((chunk, start_time, end_time))
position += step_samples
# Stop if we've reached the end
if chunk_end >= len(audio):
break
return chunks
# ===== M4A/WAV Conversion Utilities (T007-T008) =====
def convert_m4a_to_wav(
input_path: str, output_path: Optional[str] = None, sample_rate: int = 16000
) -> str:
"""
Convert M4A/AAC audio file to WAV format using FFmpeg.
This is required for pyannote.audio processing which expects WAV input.
Args:
input_path: Path to input M4A/AAC file
output_path: Path for output WAV file (auto-generated if None)
sample_rate: Target sample rate in Hz (default: 16000 for pyannote)
Returns:
Path to converted WAV file
Raises:
AudioIOError: If conversion fails or FFmpeg is not available
"""
import subprocess
from pathlib import Path
input_path = Path(input_path)
if not input_path.exists():
raise AudioIOError(f"Input file not found: {input_path}")
# Auto-generate output path if not provided
if output_path is None:
output_path = input_path.with_suffix(".wav")
else:
output_path = Path(output_path)
# Create output directory if needed
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
# Run FFmpeg conversion
cmd = [
"ffmpeg",
"-i",
str(input_path),
"-ar",
str(sample_rate), # Resample to target rate
"-ac",
"1", # Convert to mono
"-y", # Overwrite output
str(output_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f"Converted {input_path.name} to WAV at {sample_rate}Hz")
return str(output_path)
except FileNotFoundError:
raise AudioIOError(
"FFmpeg not found. Please install FFmpeg: https://ffmpeg.org/download.html"
)
except subprocess.CalledProcessError as e:
raise AudioIOError(f"FFmpeg conversion failed: {e.stderr}")
def convert_wav_to_m4a(
input_path: str, output_path: str, sample_rate: int = 44100, bitrate: str = "192k"
) -> str:
"""
Convert WAV audio file to M4A/AAC format using FFmpeg.
Used for exporting final processed audio in M4A format.
Args:
input_path: Path to input WAV file
output_path: Path for output M4A file
sample_rate: Target sample rate in Hz (default: 44100, max 48000 for M4A)
bitrate: Target bitrate (default: "192k")
Returns:
Path to converted M4A file
Raises:
AudioIOError: If conversion fails or FFmpeg is not available
"""
import subprocess
from pathlib import Path
input_path = Path(input_path)
output_path = Path(output_path)
if not input_path.exists():
raise AudioIOError(f"Input file not found: {input_path}")
# Validate sample rate for M4A (max 48kHz)
if sample_rate > 48000:
logger.warning(f"Sample rate {sample_rate}Hz exceeds M4A limit, using 48000Hz")
sample_rate = 48000
# Create output directory if needed
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
# Run FFmpeg conversion
cmd = [
"ffmpeg",
"-i",
str(input_path),
"-ar",
str(sample_rate), # Resample to target rate
"-b:a",
bitrate, # Set bitrate
"-c:a",
"aac", # Use AAC codec
"-y", # Overwrite output
str(output_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f"Converted {input_path.name} to M4A at {sample_rate}Hz, {bitrate}")
return str(output_path)
except FileNotFoundError:
raise AudioIOError(
"FFmpeg not found. Please install FFmpeg: https://ffmpeg.org/download.html"
)
except subprocess.CalledProcessError as e:
raise AudioIOError(f"FFmpeg conversion failed: {e.stderr}")
# ===== Audio Quality Validation (T009) =====
def validate_audio_quality(
audio: np.ndarray, sample_rate: int, file_path: Optional[str] = None
) -> dict:
"""
Validate audio quality and return metrics.
Checks for issues like:
- Signal-to-Noise Ratio (SNR)
- Clipping/distortion
- Duration requirements
- RMS energy levels
Args:
audio: Audio array
sample_rate: Sample rate in Hz
file_path: Optional file path for logging
Returns:
Dictionary with quality metrics and validation results:
{
'snr_db': float, # Signal-to-noise ratio in dB
'is_clipped': bool, # True if audio has clipping
'clipping_ratio': float, # Percentage of clipped samples
'rms_energy': float, # RMS energy level
'is_too_quiet': bool, # True if audio is too quiet
'duration': float, # Duration in seconds
'is_valid': bool, # Overall validation result
'warnings': list, # List of warning messages
}
"""
metrics = {"duration": len(audio) / sample_rate, "warnings": []}
# Calculate SNR estimate
noise_floor = np.percentile(np.abs(audio), 10)
signal_peak = np.percentile(np.abs(audio), 90)
snr_db = 20 * np.log10(signal_peak / (noise_floor + 1e-10))
metrics["snr_db"] = float(snr_db)
if snr_db < 15:
metrics["warnings"].append(f"Low SNR ({snr_db:.1f} dB < 15 dB)")
# Check for clipping
clipping_threshold = 0.99
clipped_samples = np.sum(np.abs(audio) > clipping_threshold)
clipping_ratio = clipped_samples / len(audio)
metrics["is_clipped"] = clipping_ratio > 0.01
metrics["clipping_ratio"] = float(clipping_ratio)
if metrics["is_clipped"]:
metrics["warnings"].append(f"Audio has clipping ({clipping_ratio * 100:.1f}% of samples)")
# Check RMS energy
rms_energy = np.sqrt(np.mean(audio**2))
metrics["rms_energy"] = float(rms_energy)
metrics["is_too_quiet"] = rms_energy < 0.01
if metrics["is_too_quiet"]:
metrics["warnings"].append(f"Audio is too quiet (RMS: {rms_energy:.4f})")
# Check duration
if metrics["duration"] < 1.0:
metrics["warnings"].append(f"Audio is very short ({metrics['duration']:.1f}s)")
# Overall validation
metrics["is_valid"] = (
snr_db >= 10 # Minimum acceptable SNR
and not metrics["is_clipped"]
and not metrics["is_too_quiet"]
and metrics["duration"] > 0.5
)
# Log results
file_desc = f" for {file_path}" if file_path else ""
if metrics["is_valid"]:
logger.debug(f"Audio quality validation passed{file_desc}")
else:
logger.warning(
f"Audio quality validation failed{file_desc}: " + ", ".join(metrics["warnings"])
)
return metrics